{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## OpenCV - Pipeline Image Transformations\n",
    "\n",
    "This example shows how to manipulate the collection of images.\n",
    "First, the images are downloaded to the local directory.\n",
    "Second, they are copied to your cluster's attached HDFS."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The images are loaded from the directory (for fast prototyping, consider loading a fraction of\n",
    "images). Inside the dataframe, each image is a single field in the image column. The image has\n",
    "sub-fields (path, height, width, OpenCV type and OpenCV bytes)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.opencv import toNDArray\n",
    "from synapse.ml.io import *\n",
    "\n",
    "imageDir = \"wasbs://publicwasb@mmlspark.blob.core.windows.net/sampleImages\"\n",
    "images = spark.read.image().load(imageDir).cache()\n",
    "images.printSchema()\n",
    "print(images.count())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also alternatively stream the images with a similar api.\n",
    "Check the [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)\n",
    "for more details on streaming."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When collected from the *DataFrame*, the image data are stored in a *Row*, which is Spark's way\n",
    "to represent structures (in the current example, each dataframe row has a single Image, which\n",
    "itself is a Row).  It is possible to address image fields by name and use `toNDArray()` helper\n",
    "function to convert the image into numpy array for further manipulations."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.core.platform import running_on_binder\n",
    "\n",
    "if running_on_binder():\n",
    "    from IPython import get_ipython\n",
    "from PIL import Image\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "data = images.take(3)  # take first three rows of the dataframe\n",
    "im = data[2][0]  # the image is in the first column of a given row\n",
    "\n",
    "print(\"image type: {}, number of fields: {}\".format(type(im), len(im)))\n",
    "print(\"image path: {}\".format(im.origin))\n",
    "print(\"height: {}, width: {}, OpenCV type: {}\".format(im.height, im.width, im.mode))\n",
    "\n",
    "arr = toNDArray(im)  # convert to numpy array\n",
    "print(images.count())\n",
    "plt.imshow(Image.fromarray(arr, \"RGB\"))  # display the image inside notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `ImageTransformer` for the basic image manipulation: resizing, cropping, etc.\n",
    "Internally, operations are pipelined and backed by OpenCV implementation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.opencv import ImageTransformer\n",
    "\n",
    "tr = (\n",
    "    ImageTransformer()  # images are resized and then cropped\n",
    "    .setOutputCol(\"transformed\")\n",
    "    .resize(size=(200, 200))\n",
    "    .crop(0, 0, height=180, width=180)\n",
    ")\n",
    "\n",
    "small = tr.transform(images).select(\"transformed\")\n",
    "\n",
    "im = small.take(3)[2][0]  # take third image\n",
    "plt.imshow(Image.fromarray(toNDArray(im), \"RGB\"))  # display the image inside notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For the advanced image manipulations, use Spark UDFs.\n",
    "The SynapseML package provides conversion function between *Spark Row* and\n",
    "*ndarray* image representations."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from synapse.ml.opencv import ImageSchema, toNDArray, toImage\n",
    "\n",
    "\n",
    "def u(row):\n",
    "    array = toNDArray(row)  # convert Image to numpy ndarray[height, width, 3]\n",
    "    array[:, :, 2] = 0\n",
    "    return toImage(array)  # numpy array back to Spark Row structure\n",
    "\n",
    "\n",
    "noBlueUDF = udf(u, ImageSchema)\n",
    "\n",
    "noblue = small.withColumn(\"noblue\", noBlueUDF(small[\"transformed\"])).select(\"noblue\")\n",
    "\n",
    "im = noblue.take(3)[2][0]  # take second image\n",
    "plt.imshow(Image.fromarray(toNDArray(im), \"RGB\"))  # display the image inside notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Images could be unrolled into the dense 1D vectors suitable for CNTK evaluation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.image import UnrollImage\n",
    "\n",
    "unroller = UnrollImage().setInputCol(\"noblue\").setOutputCol(\"unrolled\")\n",
    "\n",
    "unrolled = unroller.transform(noblue).select(\"unrolled\")\n",
    "\n",
    "vector = unrolled.take(1)[0][0]\n",
    "print(type(vector))\n",
    "len(vector.toArray())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
